/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed und512er the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include <thrift/thrift-config.h>

#include <thrift/transport/TFileTransport.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/PlatformSocket.h>
#include <thrift/concurrency/FunctionRunner.h>
#include <thrift/concurrency/Util.h>
#include <boost/format.hpp>

#include <fcntl.h>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <limits>

#ifdef _WIN32
#include <Windows.h>
#endif

namespace apache {
namespace thrift {
namespace transport {

using boost::scoped_ptr;
using boost::shared_ptr;
using namespace std;
using namespace apache::thrift::protocol;
using namespace apache::thrift::concurrency;

TFileTransport::TFileTransport(const string& path, bool readOnly, bool appendMode)
  : readState_()
  , readBuff_(NULL)
  , currentEvent_(NULL)
  , readBuffSize_(DEFAULT_READ_BUFF_SIZE)
  , readTimeout_(NO_TAIL_READ_TIMEOUT)
  , chunkSize_(DEFAULT_CHUNK_SIZE)
  , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
  , flushMaxUs_(DEFAULT_FLUSH_MAX_US)
  , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
  , maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
  , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
  , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
  , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
  , writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US)
  , dequeueBuffer_(NULL)
  , enqueueBuffer_(NULL)
  , notFull_(&mutex_)
  , notEmpty_(&mutex_)
  , closing_(false)
  , flushed_(&mutex_)
  , forceFlush_(false)
  , filename_(path)
  , bufferAndThreadInitialized_(false)
  , offset_(0)
  , lastBadChunk_(0)
  , numCorruptedEventsInChunk_(0)
  , readOnly_(readOnly)
  , appendMode_(appendMode)
{
#ifdef _WIN32
  fd_ = INVALID_HANDLE_VALUE;
#else
  fd_ = 0;
#endif
  threadFactory_.setDetached(false);
  openLogFile();
}

void TFileTransport::resetOutputFile(THRIFT_HANDLE fd, string filename, off_t offset)
{
  filename_ = filename;
  offset_ = offset;

#ifdef _WIN32
  // check if current file is still open
  if (fd_ != INVALID_HANDLE_VALUE) {
    // flush any events in the queue
    flush();
    TLogging::log_info("error, current file (%s) not closed", filename_.c_str());
  // close logfile
  if (FALSE == ::CloseHandle(fd_)) {
    int errno_copy = ::GetLastError();
    TLogging::log_err("TFileTransport: resetOutputFile() ::close(), errno: %d", errno_copy);
    throw TTransportException(TTransportException::UNKNOWN,
      "TFileTransport: error in file close",
      errno_copy);
  }
  else {
    // successfully closed fd
    fd_ = INVALID_HANDLE_VALUE;
  }

  }

  if (fd != INVALID_HANDLE_VALUE) {
    fd_ = fd;
  }
  else {
    // open file if the input fd is 0
    openLogFile();
  }
#else
  // check if current file is still open
  if (fd_ > 0) {
    // flush any events in the queue
    flush();
    TLogging::log_info("error, current file (%s)", filename_.c_str());
    if (-1 == ::close(fd_)) {
      int errno_copy = errno;
      TLogging::log_err("TFileTransport: resetOutputFile() ::close(), errno: %d", errno_copy);
      throw TTransportException(TTransportException::UNKNOWN,
                                "TFileTransport: error in file close",
                                errno_copy);
    }
    else {
      // successfully closed fd
      fd_ = 0;
    }

  }

  if (fd > 0) {
    fd_ = fd;
  }
  else {
    // open file if the input fd is 0
    openLogFile();
  }
#endif
}

TFileTransport::~TFileTransport()
{
  // flush the buffer if a writer thread is active
  if (writerThread_.get()) {
    // set state to closing
    closing_ = true;

    // wake up the writer thread
    // Since closing_ is true, it will attempt to flush all data, then exit.
    notEmpty_.notify();

    writerThread_->join();
    writerThread_.reset();
  }

  if (dequeueBuffer_) {
    delete dequeueBuffer_;
    dequeueBuffer_ = NULL;
  }

  if (enqueueBuffer_) {
    delete enqueueBuffer_;
    enqueueBuffer_ = NULL;
  }

  if (readBuff_) {
    delete[] readBuff_;
    readBuff_ = NULL;
  }

  if (currentEvent_) {
    delete currentEvent_;
    currentEvent_ = NULL;
  }

#ifdef _WIN32
  // close logfile
  if (fd_ != INVALID_HANDLE_VALUE) {
    if (FALSE == ::CloseHandle(fd_)) {
      TLogging::log_err("TFileTransport: ~TFileTransport() ::close(), errno: %d" , ::GetLastError());
    }
    else {
      // successfully closed fd
      fd_ = INVALID_HANDLE_VALUE;
    }
  }
#else
  // close logfile
  if (fd_ > 0) {
    if (-1 == ::close(fd_)) {
      TLogging::log_err("TFileTransport: ~TFileTransport() ::close(), errno: %d", errno);
    }
  else {
      // successfully closed fd
      fd_ = 0;
    }
  }
#endif
}

bool TFileTransport::initBufferAndWriteThread()
{
  if (bufferAndThreadInitialized_) {
    TLogging::log_err("Trying to double-init TFileTransport");
    return false;
  }

  if (!writerThread_.get()) {
    writerThread_ = threadFactory_.newThread(FunctionRunner::create(startWriterThread, this));
    writerThread_->start();
  }

  dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
  enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
  bufferAndThreadInitialized_ = true;

  return true;
}

void TFileTransport::write(const uint8_t* buf, uint32_t len)
{
  if (readOnly_) {
    throw TTransportException("TFileTransport: attempting to write to file opened readonly\n");
  }

  enqueueEvent(buf, len);
}

void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen)
{
  // can't enqueue more events if file is going to close
  if (closing_) {
    return;
  }

  // make sure that event size is valid
  if ((maxEventSize_ > 0) && (eventLen > maxEventSize_)) {
    TLogging::log_err("msg size is greater than max event size: %u > %u\n", eventLen , maxEventSize_);
    return;
  }

  if (eventLen == 0) {
    TLogging::log_err("cannot enqueue an empty event");
    return;
  }

  eventInfo* toEnqueue = new eventInfo();
  toEnqueue->eventBuff_ = new uint8_t[(sizeof(uint8_t) * eventLen) + 4];
  if (toEnqueue->eventBuff_ == NULL) {
    delete toEnqueue;
    throw std::bad_alloc();
  }
  // first 4 bytes is the event length
  memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
  // actual event contents
  memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
  toEnqueue->eventSize_ = eventLen + 4;

  // lock mutex
  Guard g(mutex_);

  // make sure that enqueue buffer is initialized and writer thread is running
  if (!bufferAndThreadInitialized_) {
    if (!initBufferAndWriteThread()) {
      delete toEnqueue;
      return;
    }
  }

  // Can't enqueue while buffer is full
  while (enqueueBuffer_->isFull()) {
    notFull_.wait();
  }

  // We shouldn't be trying to enqueue new data while a forced flush is
  // requested.  (Otherwise the writer thread might not ever be able to finish
  // the flush if more data keeps being enqueued.)
  assert(!forceFlush_);

  // add to the buffer
  if (!enqueueBuffer_->addEvent(toEnqueue)) {
    delete toEnqueue;
    return;
  }

  // signal anybody who's waiting for the buffer to be non-empty
  notEmpty_.notify();

  // this really should be a loop where it makes sure it got flushed
  // because condition variables can get triggered by the os for no reason
  // it is probably a non-factor for the time being
}

bool TFileTransport::swapEventBuffers(int64_t* deadline)
{
  bool swap;
  Guard g(mutex_);

  if (!enqueueBuffer_->isEmpty()) {
    swap = true;
  }
  else if (closing_) {
    // even though there is no data to write,
    // return immediately if the transport is closing
    swap = false;
  } 
  else {
    if (deadline != NULL) {
      // if we were handed a deadline time struct, do a timed wait
      notEmpty_.waitForTimeUsec(*deadline);
    }
  else {
      // just wait until the buffer gets an item
      notEmpty_.wait();
    }

    // could be empty if we timed out
    swap = enqueueBuffer_->isEmpty();
  }

  if (swap) {
    TFileTransportBuffer* temp = enqueueBuffer_;
    enqueueBuffer_ = dequeueBuffer_;
    dequeueBuffer_ = temp;
  }

  if (swap) {
    notFull_.notify();
  }

  return swap;
}

void TFileTransport::writerThread()
{
  bool hasIOError = false;

  // open file if it is not open
  if (!fd_) {
    try {
      openLogFile();
    }
  catch (...) {
      TLogging::log_err("TFileTransport: writerThread() openLogFile(), unknown exception");
      fd_ = 0;
      hasIOError = true;
    }
  }

  // set the offset to the correct value (EOF)
  if (!hasIOError) {
    try {
      seekToEnd();
      // throw away any partial events
      offset_ += readState_.lastDispatchPtr_;

#ifdef _WIN32
    LONG lowLength = (LONG)(offset_ & 0xFFFFFFFF);
    LONG hiLength = (LONG)(offset_ >> 32);
    ::SetFilePointer(fd_, lowLength, &hiLength, FILE_BEGIN);
    ::SetEndOfFile(fd_);
#else
    ::ftruncate(fd_, offset_);
#endif
      readState_.resetAllValues();
    }
  catch (...) {
      TLogging::log_err("TFileTransport: writerThread() initialization, unknown exception");
      hasIOError = true;
    }
  }

  // Figure out the next time by which a flush must take place
  int64_t ts_next_flush;
  getNextFlushTime(&ts_next_flush);
  uint32_t unflushed = 0;

  while (1) {
    // this will only be true when the destructor is being invoked
    if (closing_) {
      if (hasIOError) {
        return;
      }

      // Try to empty buffers before exit
      if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
#ifdef _WIN32
        ::FlushFileBuffers(fd_);
        if (FALSE == ::CloseHandle(fd_)) {
          int errno_copy = ::GetLastError();
          TLogging::log_err("TFileTransport: writerThread() ::close(), errno: %d", errno_copy);
        }
        else {
          // fd successfully closed
          fd_ = INVALID_HANDLE_VALUE;
        }
#else
      ::fsync(fd_);
      if (-1 == ::close(fd_)) {
        int errno_copy = errno;
        TLogging::log_err("TFileTransport: writerThread() ::close(), errno: %d", errno_copy);
      }
      else {
      // fd successfully closed
       fd_ = 0;
      }
#endif
        return;
      }
    }

    if (swapEventBuffers(&ts_next_flush)) {
      eventInfo* outEvent;
      while (NULL != (outEvent = dequeueBuffer_->getNext())) {
        // Remove an event from the buffer and write it out to disk. If there is any IO error, for
        // instance,
        // the output file is unmounted or deleted, then this event is dropped. However, the writer
        // thread
        // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then
        // start writing
        // from the end.

        while (hasIOError) {
          TLogging::log_err("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors\n", writerThreadIOErrorSleepTime_);
          Util::thrift_usleep(writerThreadIOErrorSleepTime_);
          if (closing_) {
            return;
          }
          if (!fd_) {
#ifdef _WIN32
            ::CloseHandle(fd_);
            fd_ = INVALID_HANDLE_VALUE;
#else
            ::close(fd_);
            fd_ = 0;
#endif
          }
          try {
            openLogFile();
            seekToEnd();
            unflushed = 0;
            hasIOError = false;
            TLogging::log_info("TFileTransport: log file %s reopened by writer thread during error recovery\n", filename_.c_str());
          }
          catch (...) {
            TLogging::log_err("TFileTransport: unable to reopen log file %s during error recovery\n", filename_.c_str());
          }
        }

        // sanity check on event
        if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
          TLogging::log_err("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
          continue;
        }

        // If chunking is required, then make sure that msg does not cross chunk boundary
        if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
          // event size must be less than chunk size
          if (outEvent->eventSize_ > chunkSize_) {
            TLogging::log_info("TFileTransport: event size(%u) > chunk size(%u): skipping event\n", outEvent->eventSize_, chunkSize_);
            continue;
          }

          int64_t chunk1 = offset_ / chunkSize_;
          int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1) / chunkSize_;

          // if adding this event will cross a chunk boundary, pad the chunk with zeros
          if (chunk1 != chunk2) {
            // refetch the offset to keep in sync
#ifdef _WIN32
            int64_t newOffset = 0;
            LONG hiOffset = (LONG)(newOffset >> 32);
            DWORD offset = ::SetFilePointer(fd_, (LONG)(newOffset & 0xFFFFFFFF), &hiOffset, FILE_CURRENT);
            offset_ = offset | ((int64_t)hiOffset << 32);
#else
            offset_ = ::lseek(fd_, 0, SEEK_CUR);
#endif
            int32_t padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_);

            uint8_t* zeros = new uint8_t[padding];
            memset(zeros, '\0', padding);
            boost::scoped_array<uint8_t> array(zeros);
#ifdef _WIN32
            DWORD dw;
            BOOL rv = ::WriteFile(fd_, zeros, padding, &dw, NULL);
            if (FALSE == rv) {
              int errno_copy = ::GetLastError();
              TLogging::log_err("TFileTransport: writerThread() error while padding zeros, errno: %d", errno_copy);
              hasIOError = true;
              continue;
            }
#else
            int rv = HANDLE_EINTR(::write(fd_, zeros, padding));
            if (-1 == rv) {
              int errno_copy = errno;
              TLogging::log_err("TFileTransport: writerThread() error while padding zeros, errno: %d", errno_copy);
              hasIOError = true;
              continue;
            }
#endif
            unflushed += padding;
            offset_ += padding;
          }
        }

        // write the dequeued event to the file
        if (outEvent->eventSize_ > 0) {
#ifdef _WIN32
          DWORD dw;
          BOOL rv = ::WriteFile(fd_, outEvent->eventBuff_, outEvent->eventSize_, &dw, NULL);
          if (rv == FALSE) {
            int errno_copy = ::GetLastError();
            TLogging::log_err("TFileTransport: error while writing event, errno: %d", errno_copy);
            hasIOError = true;
            continue;
          }
#else
          int rv = HANDLE_EINTR(::write(fd_, outEvent->eventBuff_, outEvent->eventSize_));
          if (-1 == rv) {
            int errno_copy = errno;
            TLogging::log_err("TFileTransport: error while writing event, errno: %d", errno_copy);
            hasIOError = true;
            continue;
          }
#endif
          unflushed += outEvent->eventSize_;
          offset_ += outEvent->eventSize_;
        }
      }
      dequeueBuffer_->reset();
    }

    if (hasIOError) {
      continue;
    }

    // Local variable to cache the state of forceFlush_.
    //
    // We only want to check the value of forceFlush_ once each time around the
    // loop.  If we check it more than once without holding the lock the entire
    // time, it could have changed state in between.  This will result in us
    // making inconsistent decisions.
    bool forced_flush = false;
    {
      Guard g(mutex_);
      if (forceFlush_) {
        if (!enqueueBuffer_->isEmpty()) {
          // If forceFlush_ is true, we need to flush all available data.
          // If enqueueBuffer_ is not empty, go back to the start of the loop to
          // write it out.
          //
          // We know the main thread is waiting on forceFlush_ to be cleared,
          // so no new events will be added to enqueueBuffer_ until we clear
          // forceFlush_.  Therefore the next time around the loop enqueueBuffer_
          // is guaranteed to be empty.  (I.e., we're guaranteed to make progress
          // and clear forceFlush_ the next time around the loop.)
          continue;
        }
        forced_flush = true;
      }
    }

    // determine if we need to perform an fsync
    bool flush = false;
    if (forced_flush || unflushed > flushMaxBytes_) {
      flush = true;
    } 
  else {
    int64_t current_time = Util::currentTimeUsec();
      
      if (current_time > ts_next_flush) {
        if (unflushed > 0) {
          flush = true;
        } else {
          // If there is no new data since the last fsync,
          // don't perform the fsync, but do reset the timer.
          getNextFlushTime(&ts_next_flush);
        }
      }
    }

    if (flush) {
      // sync (force flush) file to disk
#ifdef _WIN32
      ::FlushFileBuffers(fd_);
#else
      ::fsync(fd_);
#endif
      unflushed = 0;
      getNextFlushTime(&ts_next_flush);

      // notify anybody waiting for flush completion
      if (forced_flush) {
        Guard g(mutex_);
        forceFlush_ = false;
        assert(enqueueBuffer_->isEmpty());
        assert(dequeueBuffer_->isEmpty());
        flushed_.notifyAll();
      }
    }
  }
}

void TFileTransport::flush()
{
  // file must be open for writing for any flushing to take place
  if (!writerThread_.get()) {
    return;
  }
  // wait for flush to take place
  Guard g(mutex_);

  // Indicate that we are requesting a flush
  forceFlush_ = true;
  // Wake up the writer thread so it will perform the flush immediately
  notEmpty_.notify();

  while (forceFlush_) {
    flushed_.wait();
  }
}

uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len)
{
  uint32_t have = 0;
  uint32_t get = 0;

  while (have < len) {
    get = read(buf + have, len - have);
    if (get <= 0) {
      throw TEOFException();
    }
    have += get;
  }

  return have;
}

bool TFileTransport::peek()
{
  // check if there is an event ready to be read
  if (!currentEvent_) {
    currentEvent_ = readEvent();
  }

  // did not manage to read an event from the file. This could have happened
  // if the timeout expired or there was some other error
  if (!currentEvent_) {
    return false;
  }

  // check if there is anything to read
  return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0;
}

uint32_t TFileTransport::read(uint8_t* buf, uint32_t len)
{
  // check if there an event is ready to be read
  if (!currentEvent_) {
    currentEvent_ = readEvent();
  }

  // did not manage to read an event from the file. This could have happened
  // if the timeout expired or there was some other error
  if (!currentEvent_) {
    return 0;
  }

  // read as much of the current event as possible
  int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
  if (remaining <= (int32_t)len) {
    // copy over anything thats remaining
    if (remaining > 0) {
      memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, remaining);
    }
    delete (currentEvent_);
    currentEvent_ = NULL;
    return remaining;
  }

  // read as much as possible
  memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
  currentEvent_->eventBuffPos_ += len;
  return len;
}

// note caller is responsible for freeing returned events
eventInfo* TFileTransport::readEvent()
{
  int readTries = 0;

  if (!readBuff_) {
    readBuff_ = new uint8_t[readBuffSize_];
  }

  while (1) {
    // read from the file if read buffer is exhausted
    if (readState_.bufferPtr_ == readState_.bufferLen_) {
      // advance the offset pointer
      offset_ += readState_.bufferLen_;
#ifdef _WIN32
    DWORD dw;
    int result = ::ReadFile(fd_, readBuff_, readBuffSize_, &dw, NULL);

    if (result == FALSE) {
      dw = 0;
    }
    readState_.bufferLen_ = static_cast<uint32_t>(dw);
#else
    int rv = HANDLE_EINTR(::read(fd_, readBuff_, readBuffSize_));
      readState_.bufferLen_ = static_cast<uint32_t>(rv);
#endif
      //       if (readState_.bufferLen_) {
      //         T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
      //       }
      readState_.bufferPtr_ = 0;
      readState_.lastDispatchPtr_ = 0;

      // read error
      if (readState_.bufferLen_ == -1) {
        readState_.resetAllValues();
        TLogging::log_err("TFileTransport: error while reading from file");
        throw TTransportException("TFileTransport: error while reading from file");
      }
    else if (readState_.bufferLen_ == 0) { // EOF
        // wait indefinitely if there is no timeout
        if (readTimeout_ == TAIL_READ_TIMEOUT) {
          Util::thrift_usleep(eofSleepTime_);
          continue;
        }
    else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
          // reset state
          readState_.resetState(0);
          return NULL;
        }
    else if (readTimeout_ > 0) {
          // timeout already expired once
          if (readTries > 0) {
            readState_.resetState(0);
            return NULL;
          } else {
            Util::thrift_usleep(readTimeout_ * 1000);
            readTries++;
            continue;
          }
        }
      }
    }

    readTries = 0;

    // attempt to read an event from the buffer
    while (readState_.bufferPtr_ < readState_.bufferLen_) {
      if (readState_.readingSize_) {
        if (readState_.eventSizeBuffPos_ == 0) {
          if ((offset_ + readState_.bufferPtr_) / chunkSize_
              != ((offset_ + readState_.bufferPtr_ + 3) / chunkSize_)) {
            // skip one byte towards chunk boundary
            //            T_DEBUG_L(1, "Skipping a byte");
            readState_.bufferPtr_++;
            continue;
          }
        }

        readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = readBuff_[readState_.bufferPtr_++];

        if (readState_.eventSizeBuffPos_ == 4) {
          if (readState_.getEventSize() == 0) {
            // 0 length event indicates padding
            //            T_DEBUG_L(1, "Got padding");
            readState_.resetState(readState_.lastDispatchPtr_);
            continue;
          }
          // got a valid event
          readState_.readingSize_ = false;
          if (readState_.event_) {
            delete (readState_.event_);
          }
          readState_.event_ = new eventInfo();
          readState_.event_->eventSize_ = readState_.getEventSize();

          // check if the event is corrupted and perform recovery if required
          if (isEventCorrupted()) {
            performRecovery();
            // start from the top
            break;
          }
        }
      }
    else {
        if (!readState_.event_->eventBuff_) {
          readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
          readState_.event_->eventBuffPos_ = 0;
        }
        // take either the entire event or the remaining bytes in the buffer
        int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
                                readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);

        // copy data from read buffer into event buffer
        memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
               readBuff_ + readState_.bufferPtr_,
               reclaimBuffer);

        // increment position ptrs
        readState_.event_->eventBuffPos_ += reclaimBuffer;
        readState_.bufferPtr_ += reclaimBuffer;

        // check if the event has been read in full
        if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
          // set the completed event to the current event
          eventInfo* completeEvent = readState_.event_;
          completeEvent->eventBuffPos_ = 0;

          readState_.event_ = NULL;
          readState_.resetState(readState_.bufferPtr_);

          // exit criteria
          return completeEvent;
        }
      }
    }
  }
}

bool TFileTransport::isEventCorrupted()
{
  // an error is triggered if:
  if ((maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
    // 1. Event size is larger than user-speficied max-event size
    TLogging::log_err("Read corrupt event. Event size(%u) greater than max event size (%u)"
      , readState_.event_->eventSize_, maxEventSize_);
    return true;
  }
  else if (readState_.event_->eventSize_ > chunkSize_) {
    // 2. Event size is larger than chunk size
    TLogging::log_err("Read corrupt event. Event size(%u) greater than chunk size (%u)\n",
      readState_.event_->eventSize_, chunkSize_);
    return true;
  }
  else if (((offset_ + readState_.bufferPtr_ - 4) / chunkSize_) != ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1) / chunkSize_)) {
    // 3. size indicates that event crosses chunk boundary
    TLogging::log_err("Read corrupt event. Event crosses chunk boundary. Event size:%u  Offset:%lu\n",
      readState_.event_->eventSize_, static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4));

    return true;
  }

  return false;
}

void TFileTransport::performRecovery()
{
  // perform some kickass recovery
  uint32_t curChunk = getCurChunk();
  if (lastBadChunk_ == curChunk) {
    numCorruptedEventsInChunk_++;
  }
  else {
    lastBadChunk_ = curChunk;
    numCorruptedEventsInChunk_ = 1;
  }

  if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
    // maybe there was an error in reading the file from disk
    // seek to the beginning of chunk and try again
    seekToChunk(curChunk);
  }
  else {
    // just skip ahead to the next chunk if we not already at the last chunk
    if (curChunk != (getNumChunks() - 1)) {
      seekToChunk(curChunk + 1);
    }
  else if (readTimeout_ == TAIL_READ_TIMEOUT) {
      // if tailing the file, wait until there is enough data to start
      // the next chunk
      while (curChunk == (getNumChunks() - 1)) {
        Util::thrift_usleep(corruptedEventSleepTime_);
      }
      seekToChunk(curChunk + 1);
    }
    else {
      // pretty hosed at this stage, rewind the file back to the last successful
      // point and punt on the error
      readState_.resetState(readState_.lastDispatchPtr_);
      currentEvent_ = NULL;
      std::string errorMsg = boost::str(boost::format("TFileTransport: log file corrupted at offset: %lu") % static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_));

      TLogging::log_err(errorMsg.c_str());
      throw TTransportException(errorMsg);
    }
  }
}

void TFileTransport::seekToChunk(int32_t chunk)
{
  if (fd_ <= 0) {
    throw TTransportException("File not open");
  }

  int32_t numChunks = getNumChunks();

  // file is empty, seeking to chunk is pointless
  if (numChunks == 0) {
    return;
  }

  // negative indicates reverse seek (from the end)
  if (chunk < 0) {
    chunk += numChunks;
  }

  // too large a value for reverse seek, just seek to beginning
  if (chunk < 0) {
    TLogging::log_debug("Incorrect value for reverse seek. Seeking to beginning...");
    chunk = 0;
  }

  // cannot seek past EOF
  bool seekToEnd = false;
  off_t minEndOffset = 0;
  if (chunk >= numChunks) {
    TLogging::log_debug("Trying to seek past EOF. Seeking to EOF instead...");
    seekToEnd = true;
    chunk = numChunks - 1;
#ifdef _WIN32
    int64_t newOffset = 0;
    LONG hiOffset = (LONG)(newOffset >> 32);
    DWORD offset = ::SetFilePointer(fd_, (LONG)(newOffset & 0xFFFFFFFF), &hiOffset, FILE_END);
    minEndOffset = offset | ((int64_t)hiOffset << 32);
#else
    // this is the min offset to process events till
    minEndOffset = ::lseek(fd_, 0, SEEK_END);
#endif
  }
#ifdef _WIN32
  int64_t newOffset = int64_t(chunk) * chunkSize_;
  LONG hiOffset = (LONG)(newOffset >> 32);
  DWORD offset = ::SetFilePointer(fd_, (LONG)(newOffset & 0xFFFFFFFF), &hiOffset, FILE_BEGIN);
  offset_ = offset | ((int64_t)hiOffset << 32);
  
  readState_.resetAllValues();
  currentEvent_ = NULL;
  if (offset == INVALID_SET_FILE_POINTER) {
    TLogging::log_err("TFileTransport: lseek error in seekToChunk");
    throw TTransportException("TFileTransport: lseek error in seekToChunk");
  }
#else
  off_t newOffset = off_t(chunk) * chunkSize_;
  offset_ = ::lseek(fd_, newOffset, SEEK_SET);

  readState_.resetAllValues();
  currentEvent_ = NULL;
  if (offset_ == -1) {
    TLogging::log_err("TFileTransport: lseek error in seekToChunk");
    throw TTransportException("TFileTransport: lseek error in seekToChunk");
  }
#endif

  // seek to EOF if user wanted to go to last chunk
  if (seekToEnd) {
    uint32_t oldReadTimeout = getReadTimeout();
    setReadTimeout(NO_TAIL_READ_TIMEOUT);
    // keep on reading unti the last event at point of seekChunk call
    boost::scoped_ptr<eventInfo> event;
    while ((offset_ + readState_.bufferPtr_) < minEndOffset) {
      event.reset(readEvent());
      if (event.get() == NULL) {
        break;
      }
    }
    setReadTimeout(oldReadTimeout);
  }
}

void TFileTransport::seekToEnd()
{
  seekToChunk(getNumChunks());
}

uint32_t TFileTransport::getNumChunks()
{
  int64_t file_size = 0;
#ifdef _WIN32
  if (fd_ == INVALID_HANDLE_VALUE) {
    return 0;
  }

  DWORD lowFileSize, highFileSize;

  lowFileSize = ::GetFileSize(fd_, &highFileSize);
  if (lowFileSize == INVALID_FILE_SIZE) {
    int errno_copy = ::GetLastError();
    throw TTransportException(TTransportException::UNKNOWN,
                              "TFileTransport::getNumChunks() (GetFileSize)",
                              errno_copy);
  }
  file_size = ((int64_t)highFileSize << 32) | lowFileSize;
#else
  if (fd_ <= 0) {
    return 0;
  }
  struct stat f_info;
  int rv = ::fstat(fd_, &f_info);

  if (rv < 0) {
    int errno_copy = errno;
    throw TTransportException(TTransportException::UNKNOWN,
                              "TFileTransport::getNumChunks() (fstat)",
                              errno_copy);
  }
  file_size = f_info.st_size;
#endif

  if (file_size > 0) {
    size_t numChunks = ((file_size) / chunkSize_) + 1;
    if (numChunks > (std::numeric_limits<uint32_t>::max)()) {
      throw TTransportException("Too many chunks");
    }
    return static_cast<uint32_t>(numChunks);
  }

  // empty file has no chunks
  return 0;
}

uint32_t TFileTransport::getCurChunk()
{
  return static_cast<uint32_t>(offset_ / chunkSize_);
}

// Utility Functions
void TFileTransport::openLogFile()
{
#ifndef _WIN32
  mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
  int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT;
  if (!readOnly_) {
    if (appendMode_ == true) {
      flags |= O_APPEND;
    }
    else {
      flags |= O_TRUNC;
    }
  }
  fd_ = ::open(filename_.c_str(), flags, mode);
  offset_ = 0;

  // make sure open call was successful
  if (fd_ == -1) {
    int errno_copy = errno;
    TLogging::log_err("TFileTransport: openLogFile() ::open() file: %s, errno: %d", filename_.c_str(), errno_copy);
    throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
  }
#else
  DWORD mode = readOnly_ ? FILE_SHARE_READ : FILE_SHARE_READ | FILE_SHARE_WRITE;
  DWORD flags = readOnly_ ? GENERIC_READ : GENERIC_READ | GENERIC_WRITE;
  DWORD disposition = readOnly_ ? OPEN_EXISTING : OPEN_ALWAYS;
  if (readOnly_ == false && appendMode_ == false) {
    disposition |= TRUNCATE_EXISTING;
  }
  fd_ = ::CreateFileA(filename_.c_str(), flags, mode, 0, disposition, FILE_ATTRIBUTE_NORMAL, 0);
  offset_ = 0;

  // make sure open call was successful
  if (fd_ == INVALID_HANDLE_VALUE) {
    int errno_copy = ::GetLastError();
    TLogging::log_err("TFileTransport: openLogFile() ::open() file: %s, errno: %d", filename_.c_str(), errno_copy);
    throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
  }
#endif
  
}

void TFileTransport::getNextFlushTime(int64_t* ts_next_flush)
{
  *ts_next_flush = Util::currentTimeUsec();
  *ts_next_flush += flushMaxUs_;
}

TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
  : bufferMode_(WRITE)
  , writePoint_(0)
  , readPoint_(0)
  , size_(size)
{
  buffer_ = new eventInfo* [size];
}

TFileTransportBuffer::~TFileTransportBuffer()
{
  if (buffer_) {
    for (uint32_t i = 0; i < writePoint_; i++) {
      delete buffer_[i];
    }
    delete[] buffer_;
    buffer_ = NULL;
  }
}

bool TFileTransportBuffer::addEvent(eventInfo* event)
{
  if (bufferMode_ == READ) {
    TLogging::log_err("Trying to write to a buffer in read mode");
  }

  if (writePoint_ < size_) {
    buffer_[writePoint_++] = event;
    return true;
  }
  else {
    // buffer is full
    return false;
  }
}

eventInfo* TFileTransportBuffer::getNext() 
{
  if (bufferMode_ == WRITE) {
    bufferMode_ = READ;
  }

  if (readPoint_ < writePoint_) {
    return buffer_[readPoint_++];
  }
  else {
    // no more entries
    return NULL;
  }
}

void TFileTransportBuffer::reset()
{
  if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
    TLogging::log_debug("Resetting a buffer with unread entries");
  }

  // Clean up the old entries
  for (uint32_t i = 0; i < writePoint_; i++) {
    delete buffer_[i];
  }
  bufferMode_ = WRITE;
  writePoint_ = 0;
  readPoint_ = 0;
}

bool TFileTransportBuffer::isFull()
{
  return writePoint_ == size_;
}

bool TFileTransportBuffer::isEmpty()
{
  return writePoint_ == 0;
}

TFileProcessor::TFileProcessor(const shared_ptr<TProcessor>& processor,
                               const shared_ptr<TProtocolFactory>& protocolFactory,
                               const shared_ptr<TFileReaderTransport>& inputTransport)
  : processor_(processor)
  , inputProtocolFactory_(protocolFactory)
  , outputProtocolFactory_(protocolFactory)
  , inputTransport_(inputTransport)
{
  // default the output transport to a null transport (common case)
  outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
}

TFileProcessor::TFileProcessor(const shared_ptr<TProcessor>& processor,
                               const shared_ptr<TProtocolFactory>& inputProtocolFactory,
                               const shared_ptr<TProtocolFactory>& outputProtocolFactory,
                               const shared_ptr<TFileReaderTransport>& inputTransport)
  : processor_(processor)
  , inputProtocolFactory_(inputProtocolFactory)
  , outputProtocolFactory_(outputProtocolFactory)
  , inputTransport_(inputTransport)
{
  // default the output transport to a null transport (common case)
  outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
}

TFileProcessor::TFileProcessor(const shared_ptr<TProcessor>& processor,
                               const shared_ptr<TProtocolFactory>& protocolFactory,
                               const shared_ptr<TFileReaderTransport>& inputTransport,
                               const shared_ptr<TTransport>& outputTransport)
  : processor_(processor)
  , inputProtocolFactory_(protocolFactory)
  , outputProtocolFactory_(protocolFactory)
  , inputTransport_(inputTransport)
  , outputTransport_(outputTransport)
{

}

void TFileProcessor::process(uint32_t numEvents, bool tail)
{
  shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
  shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);

  // set the read timeout to 0 if tailing is required
  int32_t oldReadTimeout = inputTransport_->getReadTimeout();
  if (tail) {
    // save old read timeout so it can be restored
    inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
  }

  uint32_t numProcessed = 0;
  while (1) {
    // bad form to use exceptions for flow control but there is really
    // no other way around it
    try {
      processor_->process(inputProtocol, outputProtocol, NULL);
      numProcessed++;
      if ((numEvents > 0) && (numProcessed == numEvents)) {
        return;
      }
    }
    catch (TEOFException&) {
      if (!tail) {
        break;
      }
    }
    catch (TException& te) {
      cerr << te.what() << endl;
      break;
    }
  }

  // restore old read timeout
  if (tail) {
    inputTransport_->setReadTimeout(oldReadTimeout);
  }
}

void TFileProcessor::processChunk()
{
  shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
  shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);

  uint32_t curChunk = inputTransport_->getCurChunk();

  while (1) {
    // bad form to use exceptions for flow control but there is really
    // no other way around it
    try {
      processor_->process(inputProtocol, outputProtocol, NULL);
      if (curChunk != inputTransport_->getCurChunk()) {
        break;
      }
    }
    catch (TEOFException&) {
      break;
    }
    catch (TException& te) {
      TLogging::log_err(te.what());
      break;
    }
  }
}

}
}
} // apache::thrift::transport
